package com.cs.rabbittest.demo.topics;

import com.cs.rabbittest.demo.BaseFactory;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author james
 */
public class Producer04_topics {

    /**
     * Email队列名称
     */
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    /**
     * email routing key
     * 队列绑定交换机指定通配符：
     * 统配符规则：
     * 中间以“.”分隔。
     * 符号#可以匹配多个词，符号*可以匹配一个词语。
     */
    private static final String ROUTING_KEY_EMAIL = "inform.#.email.#";
    /**
     * SMS 队列名称
     */
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    /**
     * SMS routing key
     * 队列绑定交换机指定通配符：
     * 统配符规则：
     * 中间以“.”分隔。
     * 符号#可以匹配多个词，符号*可以匹配一个词语。
     */
    private static final String ROUTING_KEY_SMS = "inform.#.sms.#";

    /**
     * 交换机名称
     */
    private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";

    private static final int COUNT = 10;

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = BaseFactory.getConnection();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        /**
         * 声明交换机
         * @param1 交换机名称
         * @param2 交换机类型 fanout,topic,direct,headers
         */
        channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
        // 声明队列
        /**
         * @param1 队列名称
         * @param2 是否持久化
         * @param3 是否独占此队列
         * @param4 队列不用是否删除
         * @param5 参数
         */
        channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
        channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
        // 绑定队列和交换机
        channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM, ROUTING_KEY_EMAIL);
        channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_TOPICS_INFORM, ROUTING_KEY_SMS);
        // 开启确认模式
        channel.confirmSelect();
        //发送邮件消息
        for (int i = 0; i < COUNT; i++) {
            String message = String.format("TOPICS email inform to user %d", i);
            channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", MessageProperties.MINIMAL_BASIC, message.getBytes("UTF-8"));

        }
        // 等待回复，如果回复true
        try {
            if (channel.waitForConfirms()) {
                System.out.printf("TOPICS email 批量发送消息成功\n");
            } else {
                System.out.println("TOPICS email 批量发送失败");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 发送短信
        for (int i = 0; i < COUNT; i++) {
            String message = String.format("TOPICS SMS inform to user %d", i);
            channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", MessageProperties.MINIMAL_BASIC, message.getBytes("UTF-8"));

        }
        // 等待回复，如果回复true
        try {
            if (channel.waitForConfirms()) {
                System.out.printf("TOPICS_SMS 批量发送消息成功\n");
            } else {
                System.out.println("TOPICS_SMS 批量发送失败");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // TODO 两个都发一波
        for (int i = 0; i < COUNT; i++) {
            String message = String.format("SMS 和 Email inform to User%d", i);
            channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", MessageProperties.MINIMAL_BASIC, message.getBytes("UTF-8"));

        }
        // 等待回复，如果回复true
        try {
            if (channel.waitForConfirms()) {
                System.out.printf("TOPICS 2个都批量发送消息成功\n");
            } else {
                System.out.println("TOPICS 2个都批量发送失败");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        if (channel != null) {
            channel.close();
        }
        if (connection != null) {
            connection.close();
        }
    }
}
